package com.shujia.flink.core;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo1StreamWordCount {
    public static void main(String[] args) throws Exception {
        //1、创建FLink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置任务的并行度；一个并行度相当于一个task
        env.setParallelism(2);

        //数据从上游发送到下游的延迟时间，默认200好眠
        env.setBufferTimeout(200);

        //2、读取数据
        //nc -lk 8888
        DataStream<String> wordsDS = env.socketTextStream("master", 8888);

        //3、统计单词的数量
        DataStream<Tuple2<String, Integer>> kvDS = wordsDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

        //分组统计单词的数量
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        //对下标为1的列求和
        DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);

        //打印数据
        countDS.print();

        //启动flink
        env.execute();
    }
}
